#!/usr/bin/env python from collections import defaultdict import math import scipy.io.wavfile import scipy import keras from keras.models import Sequential from keras.layers import Dense, Dropout, Flatten from keras.layers import Conv1D, MaxPooling1D, LSTM from keras.layers import TimeDistributed, BatchNormalization, Activation # This script will evaluate the networks trained by recurrent_convolutional.py # It expects the stored weights in the same directory and with the same names as generated by the above script # Correct numbers of predictions for each epoch will also be outputted in the files rec_testcorrect and rec_traincorrect in case # training set evalution is enabled ######## EDITABLE PARAMETERS ####### # POINT TO TRAINING CSV train_csv = 'train.csv' # POINT TO TEST CSV test_csv = 'test.csv' # POINT TO FOLDER WITH WAV FILES wav_directory = 'wavsongs/' # NUMBER OF SAMPLES IN THE WAV FILES nrsamples = 465984 # HOW LARGE THE SEGMENTATION WINDOW SHOULD BE segmentsize = 59049 # CONTROLS THE WINDOW STRIDE DURING SEGMENTATION stride = segmentsize # HOW MANY EPOCHS TO EVALUATE epochs = 50 # FILTERS FOR CONVOLUTIONAL LAYERS, ENSURE THERE ARE log(segmentsize)/log(3) FILTER SIZES convFilters = [64,64,128,128,128,128,128,128,256,256] # NODES IN LSTM LAYER LSTMnodes = 512 # SET TO TRUE IF TRAINING SET ALSO NEEDS TO BE EVALUATED evaluate_training = False # LOWER THESE WHEN THERE IS NOT ENOUGH MEMORY AVAILABLE maxinmem = 800 #################################### class Clip: def __init__(self, songinfo): self.artist = songinfo[0] self.title = songinfo[1] self.album = songinfo[2] self.path = songinfo[3] def asString(self): return self.artist + ' - ' + self.title + ', ' + self.album + ', path: ' + self.path def GetTrainingAndTestData(): train = open(train_csv) songsPerArtist = defaultdict(int) clips = [] artistIndex = dict() for line in train: songinfo = line.strip('\n').split('\t') songsPerArtist[songinfo[0]] += 1 if(songsPerArtist[songinfo[0]] == 1): artistIndex[songinfo[0]] = len(artistIndex) newClip = Clip(songinfo) clips.append(newClip) train.close() test = open(test_csv) testClips = [] for line in test: songinfo = line.strip('\n').split('\t') newClip = Clip(songinfo) songsPerArtist[songinfo[0]] += 1 if(songsPerArtist[songinfo[0]] == 1): artistIndex[songinfo[0]] = len(artistIndex) print('New artist introduced in test set: ' + newClip.asString()) exit() testClips.append(newClip) return artistIndex, clips, testClips def GetSegmentedWav(clip, segmentsize, nrsamples, stride): (rate, wavdata) = scipy.io.wavfile.read(wav_directory + clip.path) segments = segments = int(math.ceil( (nrsamples-segmentsize) / float(stride))) segwav = scipy.zeros((segments,segmentsize), dtype=scipy.int16) for i in range(0,segments): segwav[i] = wavdata[i*stride:i*stride+segmentsize] return segwav.reshape(segments,segmentsize,1) def GetSlices(clips, maxperslice): slices = [((i*maxperslice), ((i+1)*maxperslice)) for i in range(0,clips/maxperslice)] slices.append( (slices[-1][1], clips)) return slices; def CreateModel(segmentsize, segments, classes): model = Sequential() model.add(TimeDistributed(Conv1D(convFilters[0],3,strides=3),input_shape=(segments,segmentsize,1))) model.add(TimeDistributed(BatchNormalization())) model.add(TimeDistributed(Activation('relu'))) for i in range(1, len(convFilters)): model.add(TimeDistributed(Conv1D(convFilters[i],3,padding='same'))) model.add(TimeDistributed(BatchNormalization())) model.add(TimeDistributed(Activation('relu'))) model.add(TimeDistributed(MaxPooling1D(3))) # After final convolutional layer flatten the output for fully connected recurrent layer model.add(TimeDistributed(Flatten())) #Add dropout to combat overfitting model.add(TimeDistributed(Dropout(0.5))) model.add(LSTM(LSTMnodes)) model.add(Dense(classes)) model.add(BatchNormalization()) model.add(Activation('softmax')) model.compile('Adam', 'categorical_crossentropy') return model segments = int(math.ceil( (nrsamples-segmentsize) / float(stride))) epochstart = 0 (artistIndex, train, test) = GetTrainingAndTestData() rArtistIndex = dict() for i in artistIndex: rArtistIndex[artistIndex[i]] = i nrtrain = len(train) nrtest = len(test) nrartists = len(artistIndex) testcorrect = [] testIn = scipy.array([GetSegmentedWav(test[i], segmentsize, nrsamples, stride) for i in range(0,nrtest)]) testLabel = scipy.array([ artistIndex[test[i].artist] for i in range(0,nrtest)]) for epoch in range(0,epochs): print('epoch: {}'.format(epoch)) testecorrect = 0 filename = 'rec_epoch_' + str(epoch) + '_weights.hd5' model = CreateModel(segmentsize,segments,nrartists) model.load_weights(filename) testOut = model.predict(testIn,batch_size=5) for i in range(0,testOut.shape[0]): p = scipy.argsort(testOut[i])[::-1] if(testLabel[i] == p[0]): testecorrect += 1 #print('Real: ' + rArtistIndex[testLabel[i]], ', Predicted: ' + rArtistIndex[p[0]] + ', ' + rArtistIndex[p[1]] + ', ' + rArtistIndex[p[2]] ) testcorrect.append(testecorrect) print("Test Correct: {}".format(testecorrect)) print(testcorrect) tosave = scipy.array(testcorrect, dtype=scipy.int16) scipy.savetxt('rec_testcorrect', tosave, fmt='%d') if(evaluate_training): traincorrect = [] slices = GetSlices(nrtrain, maxinmem) for epoch in range(0,epochs): print('epoch: {}'.format(epoch)) trainecorrect = 0 filename = 'rec_epoch_' + str(epoch) + '_weights.hd5' model = CreateModel(segmentsize,segments,nrartists) model.load_weights(filename) c = 0 tot = 0 for sli in slices: c += 1 print('{}: Predicting clips {} - {} '.format(c, tot, tot + sli[1] - sli[0] - 1)) tot += sli[1] - sli[0] batch = scipy.array([GetSegmentedWav(train[i],segmentsize,nrsamples,stride) for i in range(sli[0],sli[1])]) batchLabel = scipy.array([artistIndex[train[i].artist] for i in range(sli[0],sli[1])]) batchOut = model.predict(batch) for i in range(0,batchOut.shape[0]): p = scipy.argsort(batchOut[i])[::-1] if(batchLabel[i] == p[0]): trainecorrect += 1 #print('Real: ' + rArtistIndex[testLabel[i]], ', Predicted: ' + rArtistIndex[p[0]] + ', ' + rArtistIndex[p[1]] + ', ' + rArtistIndex[p[2]] ) traincorrect.append(trainecorrect) print("Train Correct: {}".format(trainecorrect)) print(traincorrect) tosave = scipy.array(traincorrect, dtype=scipy.int16) scipy.savetxt('rec_traincorrect', tosave, fmt='%d')